/**
 * 
 */
package com.ws.framework.remoteservice.core.protocal;

import java.util.Collection;
import java.util.List;
import java.util.Map.Entry;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;

import com.google.common.base.Predicate;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.ws.framework.remoteservice.core.model.ProviderInfo;
import com.ws.framework.remoteservice.core.model.ServiceKey;
import com.ws.framework.remoteservice.core.register.CuratorClient;
import com.ws.framework.remoteservice.core.util.WrsiConstants;
import com.ws.framework.remoteservice.core.util.WsProtocolParser;

/**
 * @author WSH
 *
 */
public class DockerManager {

	private final Logger logger = Logger.getLogger(this.getClass().getName());

	private final ArrayListMultimap<ServiceKey, Docker> registered = ArrayListMultimap.create();

	public static final DockerManager instance = new DockerManager();

	private CuratorFramework client = CuratorClient.get();

	private final ExecutorService backgroundPool = Executors
			.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("registry_watch_thread").build());

	private final TreeCache treeCache = new TreeCache(client, WrsiConstants.Starter + WrsiConstants.Prefix);

	private final ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();

	private volatile boolean startWatched = false;

	private volatile boolean initPing = false;

	private Timer timer;

	private final int delay = 30000;

	private DockerManager() {
	}

	public void addDocker(ServiceKey serviceKey, Docker docker) {
		readWriteLock.readLock().lock();
		try {
			List<Docker> dockers = registered.get(serviceKey);
			if (dockers.isEmpty() || dockers.contains(docker))
				registered.put(serviceKey, docker);
		} finally {
			readWriteLock.readLock().unlock();
		}
		if (!initPing) {
			timer = new Timer("Heart_Beat_Timer");
			timer.schedule(new TimerTask() {

				@Override
				public void run() {
					probeAllNodes();
				}
			}, delay, delay);
			initPing = true;
		}
		if (!startWatched) {
			try {
				addWatch();
			} catch (Exception e) {
				// ignore
			}
			startWatched = true;
		}
	}

	private void probeAllNodes() {
		readWriteLock.readLock().lock();
		try {
			Collection<Docker> nodes = registered.values();
			for (Docker docker : nodes) {
				try {
					docker.ping();
				} catch (Exception ex) {
				}
			}
		} finally {
			readWriteLock.readLock().unlock();
		}
	}

	/**
	 * @throws Exception
	 */
	private void addWatch() throws Exception {
		treeCache.start();
		treeCache.getListenable().addListener(new TreeCacheListener() {
			@Override
			public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
				Entry<ServiceKey, ProviderInfo> pair = null;
				try {
					pair = WsProtocolParser.decode(event.getData().getPath());
				} catch (Exception e) {
					if (e instanceof IllegalArgumentException)
						logger.warning("Decode the invalid event:" + event);
					else {
						// ignore
						if (logger.isLoggable(Level.FINE)) {
							logger.log(Level.FINE, e.getMessage());
						}
					}
				}
				if (null == pair || null == pair.getKey() || null == pair.getValue())
					return;
				if (event.getType().equals(Type.NODE_REMOVED)) {
					readWriteLock.readLock().lock();
					try {
						Collection<Docker> cs = registered.get(pair.getKey());
						if (null != cs) {
							for (Docker c : cs) {
								if (c.getInfo().toString().equals(pair.getValue().toString())) {
									readWriteLock.writeLock().lock();
									try {
										registered.get(pair.getKey()).remove(c);
									} finally {
										readWriteLock.writeLock().unlock();
									}
								}
							}
						}
					} finally {
						readWriteLock.readLock().unlock();
					}
				} else if (event.getType().equals(Type.NODE_ADDED)) {
					addDocker(pair.getKey(), new Docker(pair.getValue()));
				} else {
					// ignore
				}

			}
		}, backgroundPool);
	}

	public List<Docker> getDocker(ServiceKey serviceKey) {
		List<Docker> list = Lists.newArrayList();
		readWriteLock.readLock().lock();
		try {
			list = registered.get(serviceKey);
			Collection<Docker> c = Collections2.filter(list, new Predicate<Docker>() {
				@Override
				public boolean apply(Docker input) {
					return input.isAlive();
				}
			});
			return Lists.newArrayList(c);
		} finally {
			readWriteLock.readLock().unlock();
		}
	}

	public List<ProviderInfo> getProviderInfos(ServiceKey serviceKey) {
		List<ProviderInfo> list = Lists.newArrayList();
		try {
			List<String> providers = client.getChildren().forPath(
					WsProtocolParser.providerPrefixEncode(serviceKey.getContractName(), serviceKey.getImplCode()));
			for (String path : providers) {
				list.add(WsProtocolParser.addressDecode(path));
			}
		} catch (Exception e) {
			logger.warning("Zk获取provider信息出错... e:" + e.getMessage());
			// ignore
		}
		return list;
	}

	public synchronized void destroy() {
		backgroundPool.shutdown();
		for (Docker docker : registered.values()) {
			docker.destroy();
		}
		if (null != timer)
			timer.cancel();
	}
}
